[SPARK-54405][SQL][Metric View] CREATE command and SELECT query resolution#53158
[SPARK-54405][SQL][Metric View] CREATE command and SELECT query resolution#53158linhongliu-db wants to merge 17 commits intoapache:masterfrom
Conversation
|
cc @cloud-fan to review |
|
|
||
| mode DOLLAR_QUOTED_STRING_MODE; | ||
|
|
||
| DOLLAR_QUOTED_STRING_BODY |
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Measure.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveMetricView.scala
Show resolved
Hide resolved
| val name = child match { | ||
| case v: ResolvedIdentifier => | ||
| v.identifier.asTableIdentifier | ||
| case _ => throw QueryCompilationErrors.loadDataNotSupportedForV2TablesError() |
There was a problem hiding this comment.
this should not happen, right? CheckAnalysis should fail earlier. We can throw internal error here.
There was a problem hiding this comment.
you are right, removed
- Add METRICS keyword to lexer - Add dollar-quoted string support for YAML definitions - Add createMetricView production rule - Add METRIC_VIEW catalog table type - Implement CreateMetricViewCommand: - Parse YAML definition - Build MetricViewPlaceholder logical node - Validate and analyze metric view - Add MetricViewPlaceholder logical node with tree patterns - Update ViewHelper to support metric view creation - Add basic test suite for metric views - Add MetricViewPlanner utility: - planRead() to parse metric view for SELECT queries - planWrite() refactored from metricViewCommands - parseYAML() shared parsing logic - Add ResolveMetricView analyzer rule: - Transform MetricViewPlaceholder into aggregation queries - Parse dimensions and measures from schema metadata - Build Project with dimensions and Aggregate with measures - Handle measure references in aggregates - Update SessionCatalog to parse metric view definitions on read - Update EliminateView to handle ResolvedMetricView nodes - Refactor CreateMetricViewCommand to use MetricViewPlanner - Update ViewHelper to set METRIC_VIEW table type correctly - Add ResolveMetricView to analyzer rule chain - Update test suite with query tests update test test
0cbded9 to
9d97100
Compare
|
|
||
| override def prettyName: String = getTagValue(FunctionRegistry.FUNC_ALIAS).getOrElse("measure") | ||
|
|
||
| override def nullable: Boolean = true |
There was a problem hiding this comment.
should it be child.nullable?
sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/util/MetricViewPlanner.scala
Outdated
Show resolved
Hide resolved
| } | ||
| } | ||
|
|
||
| override def visitCodeLiteral(ctx: CodeLiteralContext): String = { |
There was a problem hiding this comment.
shall we put this in AstBuilder?
|
|
||
| if (ctx.METRICS(0) == null) { | ||
| throw QueryParsingErrors.missingClausesForOperation( | ||
| ctx, "WITH METRICS", "CREATE METRIC VIEW") |
There was a problem hiding this comment.
This is a bit misleading as we don't really support the syntax CREATE METRIC VIEW, shall we just say metric view creation?
|
|
||
| if (ctx.routineLanguage(0) == null) { | ||
| throw QueryParsingErrors.missingClausesForOperation( | ||
| ctx, "LANGUAGE", "CREATE METRIC VIEW") |
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
Outdated
Show resolved
Hide resolved
| } | ||
|
|
||
| val languageCtx = ctx.routineLanguage(0) | ||
| withOrigin(languageCtx) { |
| .getOrElse(Map.empty) | ||
| val codeLiteral = visitCodeLiteral(ctx.codeLiteral()) | ||
|
|
||
| withIdentClause(ctx.identifierReference(), ident => { |
There was a problem hiding this comment.
I think it's simpler to do
CreateMetricViewCommand(
withIdentClause(...),
userSpecifiedColumns,
visitCommentSpecList(ctx.commentSpec()),
properties,
codeLiteral,
allowExisting = ctx.EXISTS != null,
replace = ctx.REPLACE != null
)
| * groupingExpressions = [region], | ||
| * aggregateExpressions = [region, sum(amount), avg(price)], | ||
| * child = Filter(upper(region) = 'REGION_1', | ||
| * Filter(product = 'product_1', sales_table)) |
There was a problem hiding this comment.
where is the aforementioned Project?
There was a problem hiding this comment.
added. and fixed the upper filter expression to use dimension AttributeReference.
| import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ | ||
|
|
||
| override val output: Seq[Attribute] = Seq( | ||
| AttributeReference("result", StringType, nullable = false)() |
sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveMetricView.scala
Show resolved
Hide resolved
| * 2. Load and parse the stored metric view definition from catalog metadata | ||
| * 3. Build a [[Project]] node that: | ||
| * - Projects dimension expressions: [region, upper(region) AS region_upper] | ||
| * - Includes non-conflicting source columns for filters |
There was a problem hiding this comment.
but in the example, we filter by region_upper. I think the main reason is for the measure agg functions to reference columns?
There was a problem hiding this comment.
yes, that's right. I updated the comment to make it more clear
| // 3. metric view output should use the same exprId | ||
| val sourceProjList = sourceOutput.filterNot { attr => | ||
| // conflict with dimensions | ||
| metricView.outputMetrics |
There was a problem hiding this comment.
outputMetrics contains both dimension and measure columns, shall we filter out dimension columns first before we look up the column name?
| if (attr.metadata.contains(MetricViewConstants.COLUMN_TYPE_PROPERTY_KEY)) { | ||
| // no alias for metric view column since the measure reference needs to use the | ||
| // measure column in MetricViewPlaceholder, but an alias will change the exprId | ||
| attr |
There was a problem hiding this comment.
then will it have issues with DeduplicateRelation?
There was a problem hiding this comment.
I added a test case to verify that union two same metric view query will work well.
In the test, before DeduplicateRelation, the plan is below
Union false, false
:- Aggregate [region#1903], [region#1903, sum(count#1910) AS total_count#1901L, avg(price#1911) AS avg_price#1902]
: +- ResolvedMetricView `spark_catalog`.`default`.`test_metric_view`
: +- Project [count#23 AS count#1910, price#24 AS price#1911, cast(region#21 as string) AS region#1903, cast(product#22 as string) AS product#1904, cast(upper(region#21) as string) AS region_upper#1905]
: +- SubqueryAlias spark_catalog.default.test_table
: +- Relation spark_catalog.default.test_table[region#21,product#22,count#23,price#24] parquet
+- Aggregate [region#1903], [region#1903, sum(count#1910) AS total_count#1901L, avg(price#1911) AS avg_price#1902]
+- ResolvedMetricView `spark_catalog`.`default`.`test_metric_view`
+- Project [count#23 AS count#1910, price#24 AS price#1911, cast(region#21 as string) AS region#1903, cast(product#22 as string) AS product#1904, cast(upper(region#21) as string) AS region_upper#1905]
+- SubqueryAlias spark_catalog.default.test_table
+- Relation spark_catalog.default.test_table[region#21,product#22,count#23,price#24] parquet
after DeduplicateRelation, the plan is below
region: string, total_count: bigint, avg_price: double
Union false, false
:- Aggregate [region#1903], [region#1903, sum(count#1910) AS total_count#1901L, avg(price#1911) AS avg_price#1902]
: +- ResolvedMetricView `spark_catalog`.`default`.`test_metric_view`
: +- Project [count#23 AS count#1910, price#24 AS price#1911, cast(region#21 as string) AS region#1903, cast(product#22 as string) AS product#1904, cast(upper(region#21) as string) AS region_upper#1905]
: +- SubqueryAlias spark_catalog.default.test_table
: +- Relation spark_catalog.default.test_table[region#21,product#22,count#23,price#24] parquet
+- Aggregate [region#1920], [region#1920, sum(count#1918) AS total_count#1923L, avg(price#1919) AS avg_price#1924]
+- ResolvedMetricView `spark_catalog`.`default`.`test_metric_view`
+- Project [count#1916 AS count#1918, price#1917 AS price#1919, cast(region#1914 as string) AS region#1920, cast(product#1915 as string) AS product#1921, cast(upper(region#1914) as string) AS region_upper#1922]
+- SubqueryAlias spark_catalog.default.test_table
+- Relation spark_catalog.default.test_table[region#1914,product#1915,count#1916,price#1917] parquet
There was a problem hiding this comment.
then why do we need to add alias at all?
There was a problem hiding this comment.
this is not needed in this PR actually. But to support group by gropuing sets, there will be a Expand + Project between Aggregate and ResolvedMetricView node, and I found without this alias, DeduplicateRelation will failed to update the exprId for the Expand + Project and lead to dangling references.
sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveMetricView.scala
Outdated
Show resolved
Hide resolved
| }.map { attr => | ||
| if (attr.metadata.contains(MetricViewConstants.COLUMN_TYPE_PROPERTY_KEY)) { | ||
| // no alias for metric view column since the measure reference needs to use the | ||
| // measure column in MetricViewPlaceholder, but an alias will change the exprId |
There was a problem hiding this comment.
MetricViewPlaceholder.output is fully decoupled with its child. To not change the exprId we need to add an alias to retain the original exprId, and you are doing the opposite. The Project should be constructed like this:
Project(
Seq(
Alias(source_attr_1, name)(exprId = output_metric_1.exprId),
...
),
source
)
There was a problem hiding this comment.
Actually this is similar to how we output dimension cols, see https://github.com/apache/spark/pull/53158/files#diff-1f33f825cb8bc9e947d5f021b7e21ec37996d97b2d65375dd0c430a38f1c7c25R336
There was a problem hiding this comment.
oh actually here we are adding new columns not in the outputMetrics, why the exprId matters?
|
thanks, merging to master! |
What changes were proposed in this pull request?
This PR implements the command to create metric views and the analysis rule to resolve a metric view query:
WITH METRICwhen creating a viewNOTE: This PR depends on #53146
This PR also marks
org.apache.spark.sql.metricviewas an internal packageWhy are the changes needed?
SPIP: Metrics & semantic modeling in Spark
Does this PR introduce any user-facing change?
No
How was this patch tested?
Was this patch authored or co-authored using generative AI tooling?
No